 1.Flink Time之Time
   
   在Flink的流式处理中，会涉及到时间的不同概念，如下图所示：
   - EventTime[事件时间]
   事件发生的时间，例如：点击网站上的某个链接的时间，每一条日志都会记录自己的生成时间
   如果以EventTime为基准来定义时间窗口那将形成EventTimeWindow,要求消息本身就应该携带
EventTime
   - IngestionTime[摄入时间]
   数据进入Flink的时间，如某个Flink节点的source operator接收到数据的时间，例如：某个source消费
到kafka中的数据
   如果以IngesingtTime为基准来定义时间窗口那将形成IngestingTimeWindow,以source的systemTime
为准
   - ProcessingTime[处理时间]
   某个Flink节点执行某个operation的时间，例如：timeWindow处理数据时的系统时间，默认的时间属
性就是Processing Time
   如果以ProcessingTime基准来定义时间窗口那将形成ProcessingTimeWindow，以operator的
systemTime为准
   在Flink的流式处理中，绝大部分的业务都会使用EventTime，一般只在EventTime无法使用时，才会被
迫使用ProcessingTime或者IngestionTime。
   如果要使用EventTime，那么需要引入EventTime的时间属性，引入方式如下所示：
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置使用事件时间
 
 2.数据延迟产生的问题
   
   l 示例1
   现在假设，你正在去往地下停车场的路上，并且打算用手机点一份外卖。
   选好了外卖后，你就用在线支付功能付款了，这个时候是11点50分。恰好这时，你走进了地下停车库，
而这里并没有手机信号。因此外卖的在线支付并没有立刻成功，而支付系统一直在Retry重试“支付”这个
操作。
   当你找到自己的车并且开出地下停车场的时候，已经是12点05分了。这个时候手机重新有了信号，手机
上的支付数据成功发到了外卖在线支付系统，支付完成。
   在上面这个场景中你可以看到，支付数据的事件时间是11点50分，而支付数据的处理时间是12点05分
   一般在实际开发中会以事件时间作为计算标准
   l 示例2
   一条日志进入Flink的时间为2019-08-12 10:00:01,摄入时间
   到达Window的系统时间为2019-08-12 10:00:02,处理时间
   日志的内容为：2019-08-12 09:58:02 INFO Fail over to rm2 ,事件时间
   对于业务来说，要统计1h内的故障日志个数，哪个时间是最有意义的？---事件时间
   EventTime，因为我们要根据日志的生成时间进行统计。
   l 示例3
   某 App 会记录用户的所有点击行为，并回传日志（在网络不好的情况下，先保存在本地，延后回
传）。
   A 用户在 11:02 对 App 进行操作，B 用户在 11:03 操作了 App
   但是 A 用户的网络不太稳定，回传日志延迟了，导致我们在服务端先接受到 B 用户 11:03 的消息，然
后再接受到 A 用户 11:02 的消息，消息乱序了。
   l 示例4
   在实际环境中，经常会出现，因为网络原因，数据有可能会延迟一会才到达Flink实时处理系统。
   我们先来设想一下下面这个场景:
   1. 使用时间窗口来统计10分钟内的用户流量
   2. 有一个时间窗口
   - 开始时间为：2017-03-19 10:00:00
   - 结束时间为：2017-03-19 10:10:00
   3. 有一个数据，因为网络延迟
   - 事件发生的时间为：2017-03-19 10:10:00
   - 但进入到窗口的时间为：2017-03-19 10:10:02，延迟了2秒中
   4. 时间窗口并没有将59这个数据计算进来，导致数据统计不正确
   
   这种处理方式，根据消息进入到window时间，来进行计算。在网络有延迟的时候，会引起计算误差。
   如何解决?---使用水印解决网络延迟问题
   通过上面的例子,我们知道,在进行数据处理的时候应该按照事件时间进行处理,也就是窗口应该要考虑到
事件时间
   但是窗口不能无限的一直等到延迟数据的到来,需要有一个触发窗口计算的机制
   也就是我们接下来要学的watermaker水位线/水印机制
   
 3.使用Watermark解决
   
   水印(watermark)就是一个时间戳，Flink可以给数据流添加水印，
   可以理解为：收到一条消息后，额外给这个消息添加了一个时间字段，这就是添加水印。
   - 水印并不会影响原有Eventtime事件时间
   当数据流添加水印后，会按照水印时间来触发窗口计算
   也就是说watermark水印是用来触发窗口计算的
   - 一般会设置水印时间，比事件时间小几秒钟,表示最大允许数据延迟达到多久
   (即水印时间 = 事件时间 - 允许延迟时间)10:09:57 = 10:10:00 - 3s 
   - 当接收到的 水印时间 >= 窗口结束时间，则触发计算 如等到一条数据的水印时间为10:10:00 >=
10:10:00 才触发计算,也就是要等到事件时间为10:10:03 的数据到来才触发计算
   (即事件时间 - 允许延迟时间 >= 窗口结束时间 或 事件时间 >= 窗口结束时间 + 允许延迟时间)
   
   总结:watermaker是用来解决延迟数据的问题
   如窗口10:00:00~10:10:00
   而数据到达的顺序是: A 10:10:00 ,B 10:09:58
   如果没有watermaker,那么A数据将会触发窗口计算,B数据来了窗口已经关闭,则该数据丢失
   那么如果有了watermaker,设置允许数据迟到的阈值为3s
   那么该窗口的结束条件则为 水印时间>=窗口结束时间10:10:00,也就是需要有一条数据的水印时间=
10:10:00
   而水印时间10:10:00= 事件时间- 延迟时间3s
   也就是需要有一条事件时间为10:10:03 的数据到来,才会真正的触发窗口计算
   而上面的 A 10:10:00 ,B 10:09:58 都不会触发计算,也就是会被窗口包含,直到10:10:03 的数据到来才会计
算窗口10:00:00~10:10:00 的数据
   Watermark案例
   步骤：
   1、 获取数据源
   2、 转化
   3、 声明水印（watermark）
   4、 分组聚合，调用window的操作
   5、 保存处理结果
   注意：
   当使用EventTimeWindow时，所有的Window在EventTime的时间轴上进行划分，
   也就是说，在Window启动后，会根据初始的EventTime时间每隔一段时间划分一个窗口，
   如果Window大小是3秒，那么1分钟内会把Window划分为如下的形式：
[00:00:00,00:00:03)
[00:00:03,00:00:06)
[00:00:03,00:00:09)
[00:00:03,00:00:12)
[00:00:03,00:00:15)
[00:00:03,00:00:18)
[00:00:03,00:00:21)
[00:00:03,00:00:24)
...
[00:00:57,00:00:42)
[00:00:57,00:00:45)
[00:00:57,00:00:48)
...
   
   如果Window大小是10秒，则Window会被分为如下的形式：
[00:00:00,00:00:10)
[00:00:10,00:00:20)
...
[00:00:50,00:01:00)

   l 注意:
   1).窗口是左闭右开的，形式为：[window_start_time,window_end_time)。
   2).Window的设定基于第一条消息的事件时间，也就是说，Window会一直按照指定的时间间隔进行划
分，不论这个Window中有没有数据，EventTime在这个Window期间的数据会进入这个Window。
   3).Window会不断产生，属于这个Window范围的数据会被不断加入到Window中，所有未被触发的
Window都会等待触发，只要Window还没触发，属于这个Window范围的数据就会一直被加入到Window中，
直到Window被触发才会停止数据的追加，而当Window触发之后才接受到的属于被触发Window的数据会
被丢弃。
   4).Window会在以下的条件满足时被触发执行：
   (1).在[window_start_time,window_end_time)窗口中有数据存在
   (2).watermark时间 >= window_end_time；
   5).一般会设置水印时间，比事件时间小几秒钟,表示最大允许数据延迟达到多久
   (即水印时间 = 事件时间 - 允许延迟时间)
   当接收到的 水印时间 >= 窗口结束时间且窗口内有数据，则触发计算
  (即事件时间 - 允许延迟时间 >= 窗口结束时间 或 事件时间 >= 窗口结束时间 + 允许延迟时间)
  
 4.代码实现
   
   数据源：
01,1586489566000
01,1586489567000
01,1586489568000
01,1586489569000
01,1586489570000
01,1586489571000
01,1586489572000
01,1586489573000
01,1586489574000
01,1586489575000
01,1586489576000
01,1586489577000
01,1586489578000
01,1586489579000
2020-04-10 11:32:46
2020-04-10 11:32:47
2020-04-10 11:32:48
2020-04-10 11:32:49
2020-04-10 11:32:50
   代码：
package com.lagou.time;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;

/**
 * 1、获取数据源
 * 2、转化
 * 3、声明水印（watermark）
 * 4、分组聚合，调用window的操作
 * 5、 保存处理结果
 */
public class WaterMarkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000L);
        env.setParallelism(1);
        DataStreamSource<String> data = env.socketTextStream("linux121", 7777);
        SingleOutputStreamOperator<Tuple2<String, Long>> mapped = data.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String value) throws Exception {
                String[] split = value.split(",");
                return new Tuple2<String, Long>(split[0], Long.valueOf(split[1]));
            }
        });
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = mapped.assignTimestampsAndWatermarks(new WatermarkStrategy<Tuple2<String, Long>>() {
            @Override
            public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<Tuple2<String, Long>>() {
                    private long maxTimeStamp = Long.MIN_VALUE;

                    @Override
                    public void onEvent(Tuple2<String, Long> event, long eventTimestamp, WatermarkOutput output) {
                        maxTimeStamp = Math.max(maxTimeStamp, event.f1);
                        System.out.println("maxTimeStamp:" + maxTimeStamp + "...format:" + sdf.format(maxTimeStamp));
                    }

                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {
                        System.out.println(".....onPeriodicEmit.....");
                        long maxOutOfOrderness = 3000l;
                        output.emitWatermark(new Watermark(maxTimeStamp - maxOutOfOrderness));
                    }
                };
            }
        }.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));

        KeyedStream<Tuple2<String, Long>, String> keyed = watermarks.keyBy(value -> value.f0);
        WindowedStream<Tuple2<String, Long>, String, TimeWindow> windowed = keyed.window(TumblingEventTimeWindows.of(Time.seconds(4)));
        SingleOutputStreamOperator<String> result = windowed.apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {

                String key = s;
                Iterator<Tuple2<String, Long>> iterator = input.iterator();
                ArrayList<Long> list = new ArrayList<>();
                while (iterator.hasNext()) {
                    Tuple2<String, Long> next = iterator.next();
                    list.add(next.f1);
                }
                Collections.sort(list);
                String result = "key:" + key + "..." + "list.size:" + list.size() + "...list.first:" +
                        sdf.format(list.get(0)) + "...list.last:" + sdf.format(list.get(list.size()-1)) +
                        "...window.start:" + sdf.format(window.getStart()) + "...window.end:" + sdf.format(window.getEnd());
                out.collect(result);
            }
        });

        result.print();
        env.execute();
    }
}
